/**
 * Copyright [2020] [LiBo/Alex of copyright liboware@gmail.com ]
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.hyts.stream.engine.stream;

import com.google.common.collect.Maps;
import com.hyts.stream.engine.model.MetricEvent;
import com.hyts.stream.engine.model.WordEvent;
import com.hyts.stream.engine.source.CustomSource;
import com.hyts.stream.engine.toolkit.ExecutionEnvUtil;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;

/**
 * @project-name:lhy-stream
 * @package-name:com.hyts.stream.engine.sample
 * @author:LiBo/Alex
 * @create-date:2022-05-12 23:53
 * @copyright:libo-alex4java
 * @email:liboware@gmail.com
 * @description:
 */
public class WateMarkStreamExecutor {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //如果不指定时间的话，默认是 ProcessingTime，但是如果指定为事件事件的话，需要事件中带有时间或者添加时间水印

//        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        ParameterTool parameterTool = ExecutionEnvUtil.PARAMETER_TOOL;

        DataStream<WordEvent> data = env.addSource(new CustomSource())

                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<WordEvent>() {

                    private final long maxTimeLag = 5000;
                    private long currentTimestamp = Long.MIN_VALUE;

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - maxTimeLag);
                    }

                    @Override
                    public long extractTimestamp(WordEvent element, long previousElementTimestamp) {
                        long timestamp = element.getTimestamp();
                        currentTimestamp = Math.max(timestamp, currentTimestamp);
                        return timestamp;
                    }
                });

//                data.keyBy(WordEvent::getWord)
//                .timeWindow(Time.seconds(1))
        SingleOutputStreamOperator<Object> dataStream12 = data.flatMap(new FlatMapFunction<WordEvent, WordEvent>() {
                    @Override
                    public void flatMap(WordEvent value, Collector<WordEvent> out) throws Exception {
                        out.collect(value);
                    }
                }).keyBy(new KeySelector<WordEvent, Object>() {
                    @Override
                    public Object getKey(WordEvent value) throws Exception {
                        return value.getWord()+(value.getTimestamp()%2);
                    }
                })
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

                .aggregate(new AggregateFunction<WordEvent, Object, Object>() {


                    @Override
                    public Object createAccumulator() {
                        WordEvent event = new WordEvent();
                        event.setEvent(new MetricEvent(Maps.newHashMap()));
                        return event;
                    }

                    @Override
                    public Object add(WordEvent event, Object accumulator) {

                        System.out.println(event +"---"+ "--add--" +accumulator);

                        WordEvent event2 = (WordEvent)accumulator;

                        event.setCount(event2.getCount()+event.getCount());

                        try{
                            if(!((WordEvent) accumulator).getEvent().getFields().containsKey("count1")) {
                                ((WordEvent) accumulator).getEvent().getFields().putIfAbsent("count1", 1);
                            }else {
                                ((WordEvent) accumulator).getEvent().getFields().computeIfPresent("count1", (k, v) -> Integer.valueOf(String.valueOf(v)) + 1);
                            }
                        }catch (Exception e){
                            e.printStackTrace();
                        }
                        ((WordEvent) accumulator).getEvent().setName(event.getWord());
                        event.setEvent(((WordEvent) accumulator).getEvent());
                        return event;
                    }

                    @Override
                    public Object getResult(Object accumulator) {
                        return accumulator;
                    }

                    @Override
                    public Object merge(Object a, Object b) {
                        return null;
                    }
                });

        SingleOutputStreamOperator<Object>  dataStream13 = dataStream12.map(new RichMapFunction<Object, Object>() {

            @Override
            public Object map(Object value) throws Exception {
                return value;
            }
        });

        dataStream13.addSink(new RichSinkFunction<Object>() {
            @Override
            public void invoke(Object value, Context context) throws Exception {
                System.out.println("----sink1---" + value);
            }
        });
//
//
//        data.keyBy(new KeySelector<WordEvent, Object>() {
//                    @Override
//                    public Object getKey(WordEvent value) throws Exception {
//                        return value.getWord();
//                    }
//                }).windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
////                        .allowedLateness(Time.minutes(30))
//
//                .aggregate(new AggregateFunction<WordEvent, Object, Object>() {
//
//                    final ValueStateDescriptor<Map<String, WordEvent>> wordEventValueStateDescriptor = new ValueStateDescriptor("", Map.class);
//
//                    @Override
//                    public Object createAccumulator() {
////                                wordEventValueStateDescriptor = new ValueStateDescriptor("test",
//////                                        Map.class);
//                        return new WordEvent();
//                    }
//
//                    @Override
//                    public Object add(WordEvent value, Object accumulator) {
//                        System.out.println(accumulator + "--add--" + value);
////                                wordEventValueStateDescriptor.getDefaultValue().computeIfPresent((((WordEvent)accumulator).getWord()),(p1,p2)->{
////                                   value.setCount(((WordEvent)accumulator).getCount()+value.getCount());
////                                   return value;
////                                });
//                        value.setCount(((WordEvent) accumulator).getCount() + value.getCount());
//                        return value;
//                    }
//
//                    @Override
//                    public Object getResult(Object accumulator) {
////                                Map map = wordEventValueStateDescriptor.getDefaultValue();
//                        return accumulator;
//                    }
//
//                    @Override
//                    public Object merge(Object a, Object b) {
//                        System.out.println(a + " merge  " + b);
//                        return a;
//                    }
//                }).process(new ProcessFunction<Object, Object>() {
//
//                    @Override
//                    public void onTimer(long timestamp, ProcessFunction<Object, Object>.OnTimerContext ctx, Collector<Object> out) throws Exception {
//                        super.onTimer(timestamp, ctx, out);
//                    }
//
//                    @Override
//                    public void processElement(Object value, ProcessFunction<Object, Object>.Context ctx, Collector<Object> out) throws Exception {
//
//                    }
//                });
//        data.addSink(new RichSinkFunction<WordEvent>() {
//            @Override
//            public void invoke(WordEvent value, Context context) throws Exception {
//                System.out.println("----sink2---" + value);
//            }
//        });



        env.execute("zhisheng —— flink window example");
    }
}
